package com.atguigu.day03;

import com.atguigu.utils.ClickEvent;
import com.atguigu.utils.ClickSource;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class Example1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env
                .addSource(new ClickSource())
                .flatMap(
                        (ClickEvent in, Collector<ClickEvent> out) -> {
                            if (in.username.equals("Mary")) {
                                out.collect(in);
                                out.collect(in);
                            } else if (in.username.equals("Bob")) {
                                out.collect(in);
                            }
                        }
                )
                // 由于存在类型擦除，Collector<ClickEvent> -> Collector<Object>
                // 所以需要用.returns方法明确告诉Flink输出类型是什么
                .returns(Types.POJO(ClickEvent.class))
                .print();

        env.execute();
    }
}
